/* Copyright (c) 2022-2022, LiWangQian<liwangqian@huawei.com> All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without modification,
 * are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice, this list of
 *    conditions and the following disclaimer.
 *
 * 2. Redistributions in binary form must reproduce the above copyright notice, this list
 *    of conditions and the following disclaimer in the documentation and/or other materials
 *    provided with the distribution.
 *
 * 3. Neither the name of the copyright holder nor the names of its contributors may be used
 *    to endorse or promote products derived from this software without specific prior written
 *    permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */
#ifndef LOGPP_ASYNC_SINK_H
#define LOGPP_ASYNC_SINK_H

#include "logpp/configs.h"
#include "logpp/sinks/isink.h"
#include "logpp/buffers/circular_buffer.h"
#include "logpp/buffers/ringbuffer.h"

#include <atomic>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <iostream>

LOGPP_NS_BEGIN

template <typename SinkImpl = isink>
class async_sink : public isink {
public:
    using this_type = async_sink<SinkImpl>;
    using sink_type = SinkImpl;

    ~async_sink()
    {
        stop();
        print_statistic_info();
        delete_object(worker_);
        delete_object(sink_);
    }

    explicit async_sink(sink_type *sink_impl)
        : sink_{sink_impl}
    {
    }

    void open() noexcept override {}

    void close() noexcept override {}

    void output(message &m) noexcept override
    {
        while (is_running() && !try_cache_message(m)) { 
            // std::cout << "waiting..." << std::endl;
        }
    }

    void flush() noexcept override {}

    bool is_open() const noexcept override
    {
        return sink_->is_open();
    }

    void start() noexcept
    {
        try {
            stop_ = false;
            worker_ = new_object<std::thread>(&async_sink::message_loop, this);
            if (worker_ == nullptr) stop_ = true;
        } catch (...) {
            stop_ = true;
            delete_object(worker_);
            worker_ = nullptr;
        }
    }

    void stop() noexcept
    {
        stop_ = true;
        cv_.notify_all();
        if (worker_ && worker_->joinable()) {
            worker_->join();
        }
    }

    bool is_running() const noexcept
    {
        return worker_ != nullptr && !stop_;
    }

private:
    bool try_cache_message(message &m) noexcept
    {
        std::unique_lock<std::mutex> lock{mtx_};
        bool pushed = cached_.try_push_swap(m);
        if (pushed) cv_.notify_all();
        return pushed;
    }

    void message_loop() noexcept
    {
        message m;
        while (!stop_.load()) {
            do {
                wait_messages();
                output_messages(m);
            } while (!cached_.empty());
        }
    }

    void wait_messages() noexcept
    {
        std::unique_lock<std::mutex> lock{mtx_};
        cv_.wait(lock, [this]() { return !cached_.empty() || stop_; });
        cached_.swap(running_);
    }

    void output_messages(message &m) noexcept
    {
        auto cnt = running_.size();
        if (cnt == 0) return;

        auto start = time::clock::now();
        sink_->open();
        while (running_.try_pop_swap(m)) {
            sink_->output(m);
        }
        sink_->flush();
        sink_->close();
        auto stop = time::clock::now();
        total_sink_ += cnt;
        time_ns_ += (stop - start).count();
    }

    void print_statistic_info() noexcept
    {
        std::cout << "[async-sink] messages count: " << total_sink_ << ", elapse: "
                  << time_ns_ << " ns"
                  << std::endl;
    }

    std::mutex mtx_;
    std::condition_variable cv_;
    ringbuffer<message> cached_{LOGPP_LOG_BUFFER_SIZE};
    ringbuffer<message> running_{LOGPP_LOG_BUFFER_SIZE};
    sink_type *sink_{nullptr};
    std::thread *worker_{nullptr};
    std::atomic_bool stop_{true};

    // debug
    size_t total_sink_{0};
    uint64_t time_ns_{0};
};

LOGPP_NS_END

#endif
